// <experimental/io_service> -*- C++ -*-

// Copyright (C) 2015-2020 Free Software Foundation, Inc.
//
// This file is part of the GNU ISO C++ Library.  This library is free
// software; you can redistribute it and/or modify it under the
// terms of the GNU General Public License as published by the
// Free Software Foundation; either version 3, or (at your option)
// any later version.

// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// Under Section 7 of GPL version 3, you are granted additional
// permissions described in the GCC Runtime Library Exception, version
// 3.1, as published by the Free Software Foundation.

// You should have received a copy of the GNU General Public License and
// a copy of the GCC Runtime Library Exception along with this program;
// see the files COPYING3 and COPYING.RUNTIME respectively.  If not, see
// <http://www.gnu.org/licenses/>.

/** @file experimental/io_context
 *  This is a TS C++ Library header.
 *  @ingroup networking-ts
 */

#ifndef _GLIBCXX_EXPERIMENTAL_IO_SERVICE
#define _GLIBCXX_EXPERIMENTAL_IO_SERVICE 1

#pragma GCC system_header

#if __cplusplus >= 201402L

#include <atomic>
#include <chrono>
#include <forward_list>
#include <functional>
#include <system_error>
#include <thread>
#include <experimental/netfwd>
#include <experimental/executor>
#if _GLIBCXX_HAVE_UNISTD_H
# include <unistd.h>
#endif
#ifdef _GLIBCXX_HAVE_POLL_H
# include <poll.h>
#endif
#ifdef _GLIBCXX_HAVE_FCNTL_H
# include <fcntl.h>
#endif

namespace std _GLIBCXX_VISIBILITY(default)
{
_GLIBCXX_BEGIN_NAMESPACE_VERSION
namespace experimental
{
namespace net
{
inline namespace v1
{

  /** @addtogroup networking-ts
   *  @{
   */

  class __socket_impl;

  /// An ExecutionContext for I/O operations.
  class io_context : public execution_context
  {
  public:
    // types:

    /// An executor for an io_context.
    class executor_type
    {
    public:
      // construct / copy / destroy:

      executor_type(const executor_type& __other) noexcept = default;
      executor_type(executor_type&& __other) noexcept = default;

      executor_type& operator=(const executor_type& __other) noexcept = default;
      executor_type& operator=(executor_type&& __other) noexcept = default;

      // executor operations:

      bool running_in_this_thread() const noexcept
      {
	lock_guard<mutex> __lock(_M_ctx->_M_mtx);
	auto __end = _M_ctx->_M_call_stack.end();
	return std::find(_M_ctx->_M_call_stack.begin(), __end,
			 this_thread::get_id()) != __end;
      }

      io_context& context() const noexcept { return *_M_ctx; }

      void on_work_started() const noexcept { ++_M_ctx->_M_work_count; }
      void on_work_finished() const noexcept { --_M_ctx->_M_work_count; }

      template<typename _Func, typename _ProtoAllocator>
	void
	dispatch(_Func&& __f, const _ProtoAllocator& __a) const
	{
	  if (running_in_this_thread())
	    decay_t<_Func>{std::forward<_Func>(__f)}();
	  else
	    post(std::forward<_Func>(__f), __a);
	}

      template<typename _Func, typename _ProtoAllocator>
	void
	post(_Func&& __f, const _ProtoAllocator& __a) const
	{
	  lock_guard<mutex> __lock(_M_ctx->_M_mtx);
	  // TODO (re-use functionality in system_context)
	  _M_ctx->_M_reactor._M_notify();
	}

      template<typename _Func, typename _ProtoAllocator>
	void
	defer(_Func&& __f, const _ProtoAllocator& __a) const
	{ post(std::forward<_Func>(__f), __a); }

    private:
      friend io_context;

      explicit
      executor_type(io_context& __ctx) : _M_ctx(std::addressof(__ctx)) { }

      io_context* _M_ctx;
    };

    using count_type =  size_t;

    // construct / copy / destroy:

    io_context() : _M_work_count(0) { }

    explicit
    io_context(int __concurrency_hint) : _M_work_count(0) { }

    io_context(const io_context&) = delete;
    io_context& operator=(const io_context&) = delete;

    // io_context operations:

    executor_type get_executor() noexcept { return executor_type(*this); }

    count_type
    run()
    {
      count_type __n = 0;
      while (run_one())
	if (__n != numeric_limits<count_type>::max())
	  ++__n;
      return __n;
    }

    template<typename _Rep, typename _Period>
      count_type
      run_for(const chrono::duration<_Rep, _Period>& __rel_time)
      { return run_until(chrono::steady_clock::now() + __rel_time); }

    template<typename _Clock, typename _Duration>
      count_type
      run_until(const chrono::time_point<_Clock, _Duration>& __abs_time)
      {
	count_type __n = 0;
	while (run_one_until(__abs_time))
	  if (__n != numeric_limits<count_type>::max())
	    ++__n;
	return __n;
      }

    count_type
    run_one()
    { return _M_do_one(chrono::milliseconds{-1}); }

    template<typename _Rep, typename _Period>
      count_type
      run_one_for(const chrono::duration<_Rep, _Period>& __rel_time)
      { return run_one_until(chrono::steady_clock::now() + __rel_time); }

    template<typename _Clock, typename _Duration>
      count_type
      run_one_until(const chrono::time_point<_Clock, _Duration>& __abs_time)
      {
	auto __now = _Clock::now();
	while (__now < __abs_time)
	  {
	    using namespace std::chrono;
	    auto __ms = duration_cast<milliseconds>(__abs_time - __now);
	    if (_M_do_one(__ms))
	      return 1;
	    __now = _Clock::now();
	  }
	return 0;
      }

    count_type
    poll()
    {
      count_type __n = 0;
      while (poll_one())
	if (__n != numeric_limits<count_type>::max())
	  ++__n;
      return __n;
    }

    count_type
    poll_one()
    { return _M_do_one(chrono::milliseconds{0}); }

    void stop()
    {
      lock_guard<mutex> __lock(_M_mtx);
      _M_stopped = true;
      _M_reactor._M_notify();
    }

    bool stopped() const noexcept
    {
      lock_guard<mutex> __lock(_M_mtx);
      return _M_stopped;
    }

    void restart()
    {
      _M_stopped = false;
    }

  private:

    template<typename _Clock, typename _WaitTraits>
      friend class basic_waitable_timer;

    friend __socket_impl;

    template<typename _Protocol>
      friend class __basic_socket_impl;

    template<typename _Protocol>
      friend class basic_socket;

    template<typename _Protocol>
      friend class basic_datagram_socket;

    template<typename _Protocol>
      friend class basic_stream_socket;

    template<typename _Protocol>
      friend class basic_socket_acceptor;

    count_type
    _M_outstanding_work() const
    { return _M_work_count + !_M_ops.empty(); }

    struct __timer_queue_base : execution_context::service
    {
      // return milliseconds until next timer expires, or milliseconds::max()
      virtual chrono::milliseconds _M_next() const = 0;
      virtual bool run_one() = 0;

    protected:
      explicit
      __timer_queue_base(execution_context& __ctx) : service(__ctx)
      {
	auto& __ioc = static_cast<io_context&>(__ctx);
	lock_guard<mutex> __lock(__ioc._M_mtx);
	__ioc._M_timers.push_back(this);
      }

      mutable mutex	_M_qmtx;
    };

    template<typename _Timer, typename _Key = typename _Timer::_Key>
      struct __timer_queue : __timer_queue_base
      {
	using key_type = __timer_queue;

	explicit
	__timer_queue(execution_context& __ctx) : __timer_queue_base(__ctx)
	{ }

	void shutdown() noexcept { }

	io_context& context() noexcept
	{ return static_cast<io_context&>(service::context()); }

	// Start an asynchronous wait.
	void
	push(const _Timer& __t, function<void(error_code)> __h)
	{
	  context().get_executor().on_work_started();
	  lock_guard<mutex> __lock(_M_qmtx);
	  _M_queue.emplace(__t, _M_next_id++, std::move(__h));
	  // no need to notify reactor unless this timer went to the front?
	}

	// Cancel all outstanding waits for __t
	size_t
	cancel(const _Timer& __t)
	{
	  lock_guard<mutex> __lock(_M_qmtx);
	  size_t __count = 0;
	  auto __last = _M_queue.end();
	  for (auto __it = _M_queue.begin(), __end = __last; __it != __end;
	      ++__it)
	    {
	      if (__it->_M_key == __t._M_key.get())
		{
		  __it->cancel();
		  __last = __it;
		  ++__count;
		}
	    }
	  if (__count)
	    _M_queue._M_sort_to(__last);
	  return __count;
	}

	// Cancel oldest outstanding wait for __t
	bool
	cancel_one(const _Timer& __t)
	{
	  lock_guard<mutex> __lock(_M_qmtx);
	  const auto __end = _M_queue.end();
	  auto __oldest = __end;
	  for (auto __it = _M_queue.begin(); __it != __end; ++__it)
	    if (__it->_M_key == __t._M_key.get())
	      if (__oldest == __end || __it->_M_id < __oldest->_M_id)
		__oldest = __it;
	  if (__oldest == __end)
	    return false;
	  __oldest->cancel();
	  _M_queue._M_sort_to(__oldest);
	  return true;
	}

	chrono::milliseconds
	_M_next() const override
	{
	  typename _Timer::time_point __exp;
	  {
	    lock_guard<mutex> __lock(_M_qmtx);
	    if (_M_queue.empty())
	      return chrono::milliseconds::max();  // no pending timers
	    if (_M_queue.top()._M_key == nullptr)
	      return chrono::milliseconds::zero(); // cancelled, run now
	    __exp = _M_queue.top()._M_expiry;
	  }
	  auto __dur = _Timer::traits_type::to_wait_duration(__exp);
	  if (__dur < __dur.zero())
	    __dur = __dur.zero();
	  return chrono::duration_cast<chrono::milliseconds>(__dur);
	}

      private:

	bool run_one() override
	{
	  auto __now = _Timer::clock_type::now();
	  function<void(error_code)> __h;
	  error_code __ec;
	  {
	    lock_guard<mutex> __lock(_M_qmtx);

	    if (_M_queue.top()._M_key == nullptr) // cancelled
	      {
		__h = std::move(_M_queue.top()._M_h);
		__ec = std::make_error_code(errc::operation_canceled);
		_M_queue.pop();
	      }
	    else if (_M_queue.top()._M_expiry <= _Timer::clock_type::now())
	      {
		__h = std::move(_M_queue.top()._M_h);
		_M_queue.pop();
	      }
	  }
	  if (__h)
	    {
	      __h(__ec);
	      context().get_executor().on_work_finished();
	      return true;
	    }
	  return false;
	}

	using __timer_id_type = uint64_t;

	struct __pending_timer
	{
	  __pending_timer(const _Timer& __t, uint64_t __id,
			  function<void(error_code)> __h)
	  : _M_expiry(__t.expiry()), _M_key(__t._M_key.get()), _M_id(__id),
	    _M_h(std::move(__h))
	  { }

	  typename _Timer::time_point _M_expiry;
	  _Key* _M_key;
	  __timer_id_type _M_id;
	  function<void(error_code)> _M_h;

	  void cancel() { _M_expiry = _M_expiry.min(); _M_key = nullptr; }

	  bool
	  operator<(const __pending_timer& __rhs) const
	  { return _M_expiry < __rhs._M_expiry; }
	};

	struct __queue : priority_queue<__pending_timer>
	{
	  using iterator =
	    typename priority_queue<__pending_timer>::container_type::iterator;

	  // expose begin/end/erase for direct access to underlying container
	  iterator begin() { return this->c.begin(); }
	  iterator end() { return this->c.end(); }
	  iterator erase(iterator __it) { return this->c.erase(__it); }

	  void
	  _M_sort_to(iterator __it)
	  { std::stable_sort(this->c.begin(), ++__it); }
	};

	__queue	_M_queue;
	__timer_id_type _M_next_id = 0;
      };

    template<typename _Timer, typename _CompletionHandler>
      void
      async_wait(const _Timer& __timer, _CompletionHandler&& __h)
      {
	auto& __queue = use_service<__timer_queue<_Timer>>(*this);
	__queue.push(__timer, std::move(__h));
	_M_reactor._M_notify();
      }

    // Cancel all wait operations initiated by __timer.
    template<typename _Timer>
      size_t
      cancel(const _Timer& __timer)
      {
	if (!has_service<__timer_queue<_Timer>>(*this))
	  return 0;

	auto __c = use_service<__timer_queue<_Timer>>(*this).cancel(__timer);
	if (__c != 0)
	  _M_reactor._M_notify();
	return __c;
      }

    // Cancel the oldest wait operation initiated by __timer.
    template<typename _Timer>
      size_t
      cancel_one(const _Timer& __timer)
      {
	if (!has_service<__timer_queue<_Timer>>(*this))
	  return 0;

	if (use_service<__timer_queue<_Timer>>(*this).cancel_one(__timer))
	  {
	    _M_reactor._M_notify();
	    return 1;
	  }
	return 0;
      }

    template<typename _Op>
      void
      async_wait(int __fd, int __w, _Op&& __op)
      {
	lock_guard<mutex> __lock(_M_mtx);
	// TODO need push_back, use std::list not std::forward_list
	auto __tail = _M_ops.before_begin(), __it = _M_ops.begin();
	while (__it != _M_ops.end())
	  {
	    ++__it;
	    ++__tail;
	  }
	using __type = __async_operation_impl<_Op>;
	_M_ops.emplace_after(__tail,
			     make_unique<__type>(std::move(__op), __fd, __w));
	_M_reactor._M_fd_interest(__fd, __w);
      }

    void _M_add_fd(int __fd) { _M_reactor._M_add_fd(__fd); }
    void _M_remove_fd(int __fd) { _M_reactor._M_remove_fd(__fd); }

    void cancel(int __fd, error_code&)
    {
      lock_guard<mutex> __lock(_M_mtx);
      const auto __end = _M_ops.end();
      auto __it = _M_ops.begin();
      auto __prev = _M_ops.before_begin();
      while (__it != __end && (*__it)->_M_is_cancelled())
	{
	  ++__it;
	  ++__prev;
	}
      auto __cancelled = __prev;
      while (__it != __end)
	{
	  if ((*__it)->_M_fd == __fd)
	    {
	      (*__it)->cancel();
	      ++__it;
	      _M_ops.splice_after(__cancelled, _M_ops, __prev);
	      ++__cancelled;
	    }
	  else
	    {
	      ++__it;
	      ++__prev;
	    }
	}
      _M_reactor._M_not_interested(__fd);
    }

    struct __async_operation
    {
      __async_operation(int __fd, int __ev) : _M_fd(__fd), _M_ev(__ev) { }

      virtual ~__async_operation() = default;

      int _M_fd;
      short _M_ev;

      void cancel() { _M_fd = -1; }
      bool _M_is_cancelled() const { return _M_fd == -1; }
      virtual void run(io_context&) = 0;
    };

    template<typename _Op>
      struct __async_operation_impl : __async_operation
      {
	__async_operation_impl(_Op&& __op, int __fd, int __ev)
	: __async_operation{__fd, __ev}, _M_op(std::move(__op)) { }

	_Op _M_op;

	void run(io_context& __ctx)
	{
	  if (_M_is_cancelled())
	    _M_op(std::make_error_code(errc::operation_canceled));
	  else
	    _M_op(error_code{});
	}
      };

    atomic<count_type>		_M_work_count;
    mutable mutex		_M_mtx;
    queue<function<void()>>	_M_op;
    bool			_M_stopped = false;

    struct __monitor
    {
      __monitor(io_context& __c) : _M_ctx(__c)
      {
	lock_guard<mutex> __lock(_M_ctx._M_mtx);
	_M_ctx._M_call_stack.push_back(this_thread::get_id());
      }

      ~__monitor()
      {
	lock_guard<mutex> __lock(_M_ctx._M_mtx);
	_M_ctx._M_call_stack.pop_back();
	if (_M_ctx._M_outstanding_work() == 0)
	  {
	    _M_ctx._M_stopped = true;
	    _M_ctx._M_reactor._M_notify();
	  }
      }

      __monitor(__monitor&&) = delete;

      io_context& _M_ctx;
    };

    bool
    _M_do_one(chrono::milliseconds __timeout)
    {
      const bool __block = __timeout != chrono::milliseconds::zero();

      __reactor::__fdvec __fds;

      __monitor __mon{*this};

      __timer_queue_base* __timerq = nullptr;
      unique_ptr<__async_operation> __async_op;

      while (true)
	{
	  if (__timerq)
	    {
	      if (__timerq->run_one())
		return true;
	      else
		__timerq = nullptr;
	    }

	  if (__async_op)
	    {
	      __async_op->run(*this);
	      // TODO need to unregister __async_op
	      return true;
	    }

	  chrono::milliseconds __ms{0};

	  {
	    lock_guard<mutex> __lock(_M_mtx);

	    if (_M_stopped)
	      return false;

	    // find first timer with something to do
	    for (auto __q : _M_timers)
	      {
		auto __next = __q->_M_next();
		if (__next == __next.zero())  // ready to run immediately
		  {
		    __timerq = __q;
		    __ms = __next;
		    break;
		  }
		else if (__next != __next.max() && __block
		    && (__next < __ms || __timerq == nullptr))
		  {
		    __timerq = __q;
		    __ms = __next;
		  }
	      }

	    if (__timerq && __ms == __ms.zero())
	      continue;  // restart loop to run a timer immediately

	    if (!_M_ops.empty() && _M_ops.front()->_M_is_cancelled())
	      {
		_M_ops.front().swap(__async_op);
		_M_ops.pop_front();
		continue;
	      }

	    // TODO run any posted items

	    if (__block)
	      {
		if (__timerq == nullptr)
		  __ms = __timeout;
		else if (__ms.zero() <= __timeout && __timeout < __ms)
		  __ms = __timeout;
		else if (__ms.count() > numeric_limits<int>::max())
		  __ms = chrono::milliseconds{numeric_limits<int>::max()};
	      }
	    // else __ms == 0 and poll() will return immediately

	  }

	  auto __res = _M_reactor.wait(__fds, __ms);

	  if (__res == __reactor::_S_retry)
	    continue;

	  if (__res == __reactor::_S_timeout)
	    if (__timerq == nullptr)
	      return false;
	    else
	      continue;  // timed out, so restart loop and process the timer

	  __timerq = nullptr;

	  if (__fds.empty()) // nothing to do
	    return false;

	  lock_guard<mutex> __lock(_M_mtx);
	  for (auto __it = _M_ops.begin(), __end = _M_ops.end(),
	      __prev = _M_ops.before_begin(); __it != __end; ++__it, ++__prev)
	    {
	      auto& __op = **__it;
	      auto __pos = std::lower_bound(__fds.begin(), __fds.end(),
		  __op._M_fd,
		  [](const auto& __p, int __fd) { return __p.fd < __fd; });
	      if (__pos != __fds.end() && __pos->fd == __op._M_fd
		  && __pos->revents & __op._M_ev)
		{
		  __it->swap(__async_op);
		  _M_ops.erase_after(__prev);
		  break;  // restart loop and run op
		}
	    }
	}
    }

    struct __reactor
    {
      __reactor() : _M_fds(1)
      {
	int __pipe[2];
	if (::pipe(__pipe) == -1)
	  __throw_system_error(errno);
	if (::fcntl(__pipe[0], F_SETFL, O_NONBLOCK) == -1
	    || ::fcntl(__pipe[1], F_SETFL, O_NONBLOCK) == -1)
	  {
	    int __e = errno;
	    ::close(__pipe[0]);
	    ::close(__pipe[1]);
	    __throw_system_error(__e);
	  }
	_M_fds.back().events	= POLLIN;
	_M_fds.back().fd	= __pipe[0];
	_M_notify_wr		= __pipe[1];
      }

      ~__reactor()
      {
	::close(_M_fds.back().fd);
	::close(_M_notify_wr);
      }

      // write a notification byte to the pipe (ignoring errors)
      void _M_notify()
      {
	int __n;
	do {
	  __n = ::write(_M_notify_wr, "", 1);
	} while (__n == -1 && errno == EINTR);
      }

      // read all notification bytes from the pipe
      void _M_on_notify()
      {
	// Drain the pipe.
	char __buf[64];
	ssize_t __n;
	do {
	  __n = ::read(_M_fds.back().fd, __buf, sizeof(__buf));
	} while (__n != -1 || errno == EINTR);
      }

      void
      _M_add_fd(int __fd)
      {
	auto __pos = _M_lower_bound(__fd);
	if (__pos->fd == __fd)
	  __throw_system_error((int)errc::invalid_argument);
	_M_fds.insert(__pos, __fdvec::value_type{})->fd = __fd;
	_M_notify();
      }

      void
      _M_remove_fd(int __fd)
      {
	auto __pos = _M_lower_bound(__fd);
	if (__pos->fd == __fd)
	  _M_fds.erase(__pos);
	// else bug!
	_M_notify();
      }

      void
      _M_fd_interest(int __fd, int __w)
      {
	auto __pos = _M_lower_bound(__fd);
	if (__pos->fd == __fd)
	  __pos->events |= __w;
	// else bug!
	_M_notify();
      }

      void
      _M_not_interested(int __fd)
      {
	auto __pos = _M_lower_bound(__fd);
	if (__pos->fd == __fd)
	  __pos->events = 0;
	_M_notify();
      }

# ifdef _GLIBCXX_HAVE_POLL_H
      using __fdvec = vector<::pollfd>;

      // Find first element p such that !(p.fd < __fd)
      // N.B. always returns a dereferencable iterator.
      __fdvec::iterator
      _M_lower_bound(int __fd)
      {
	return std::lower_bound(_M_fds.begin(), _M_fds.end() - 1,
	    __fd, [](const auto& __p, int __fd) { return __p.fd < __fd; });
      }

      enum __status { _S_retry, _S_timeout, _S_ok, _S_error };

      __status
      wait(__fdvec& __fds, chrono::milliseconds __timeout)
      {
	// XXX not thread-safe!
	__fds = _M_fds;  // take snapshot to pass to poll()

	int __res = ::poll(__fds.data(), __fds.size(), __timeout.count());

	if (__res == -1)
	  {
	    __fds.clear();
	    if (errno == EINTR)
	      return _S_retry;
	    return _S_error; // XXX ???
	  }
	else if (__res == 0)
	  {
	    __fds.clear();
	    return _S_timeout;
	  }
	else if (__fds.back().revents != 0) // something changed, restart
	  {
	    __fds.clear();
	    _M_on_notify();
	    return _S_retry;
	  }

	auto __part = std::stable_partition(__fds.begin(), __fds.end() - 1,
	      [](const __fdvec::value_type& __p) { return __p.revents != 0; });
	__fds.erase(__part, __fds.end());

	return _S_ok;
      }

      __fdvec _M_fds;	// _M_fds.back() is the read end of the self-pipe
#endif
      int _M_notify_wr;	// write end of the self-pipe
    };

    __reactor _M_reactor;

    vector<__timer_queue_base*>			_M_timers;
    forward_list<unique_ptr<__async_operation>>	_M_ops;

    vector<thread::id>	_M_call_stack;
  };

  inline bool
  operator==(const io_context::executor_type& __a,
	     const io_context::executor_type& __b) noexcept
  {
    // https://github.com/chriskohlhoff/asio-tr2/issues/201
    using executor_type = io_context::executor_type;
    return std::addressof(executor_type(__a).context())
      == std::addressof(executor_type(__b).context());
  }

  inline bool
  operator!=(const io_context::executor_type& __a,
	     const io_context::executor_type& __b) noexcept
  { return !(__a == __b); }

  template<> struct is_executor<io_context::executor_type> : true_type {};

  /// @}

} // namespace v1
} // namespace net
} // namespace experimental
_GLIBCXX_END_NAMESPACE_VERSION
} // namespace std

#endif // C++14

#endif // _GLIBCXX_EXPERIMENTAL_IO_SERVICE
